fix(cdk): fix state emit for ConcurrentPerPartitionCursor#539
Conversation
|
/autofix |
📝 WalkthroughWalkthroughThe updates introduce a conditional check in the concurrent partition cursor logic to skip emitting state messages under specific global cursor and throttling conditions. A redundant blank line is removed from a factory method. Unit tests are expanded to cover scenarios with global cursors and no incremental dependencies, including assertions on emitted state message counts. Changes
Sequence Diagram(s)sequenceDiagram
participant TestRunner
participant MockAPI
participant ConcurrentPartitionCursor
TestRunner->>ConcurrentPartitionCursor: Initiate sync with global cursor and no dependency
loop For each partition
ConcurrentPartitionCursor->>MockAPI: Fetch partition data (posts, comments, votes)
MockAPI-->>ConcurrentPartitionCursor: Return paginated responses
end
alt Throttling enabled & using global cursor & parent state empty
ConcurrentPartitionCursor-->>TestRunner: Skip emitting state message
else
ConcurrentPartitionCursor-->>TestRunner: Emit state message
end
TestRunner->>TestRunner: Assert number of emitted state messages
Would you like me to create a diagram comparing the old and new state message emission flows as well, or is this overview sufficient for your needs? Wdyt? Suggested reviewers
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms (9)
🔇 Additional comments (6)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
245-247: The conditional skip looks good for preventing redundant emissions.This addition intelligently prevents double-emitting state messages when using global cursors without a parent state. This should effectively fix the issue described in the PR where state messages were emitted multiple times at the end of synchronization.
Are you confident this won't affect any other scenarios where we need state emission? Perhaps we should add a debug log here to help with troubleshooting if someone encounters issues, wdyt?
if self._use_global_cursor and not self._parent_state: + logger.debug(f"Skipping state emission for global cursor with no parent state for stream {self._stream_name}") return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(0 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py(5 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py (1)
test_incremental_parent_state_no_incremental_dependency(1491-1534)
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (7)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (7)
312-321: Good addition of test manifests for global cursor scenarios!This new manifest variant properly configures a global cursor with disabled incremental dependencies, which is essential for testing the fix for duplicate state emission with global cursors. Great job covering this case explicitly.
329-329: Smart extension to the test helper function!Adding the optional
state_countparameter allows you to verify the exact number of state messages emitted, which directly addresses the core issue being fixed. This is a clean way to validate the behavior without invasive changes.
369-369: Good assertion for state message count validation.This new assertion ensures the correct number of state messages are emitted when the parameter is provided. It gracefully handles cases where state count isn't important by using a default True assertion.
448-449: Properly updated parameter lists to support state count validation.You've consistently updated both the parameterized test definition and the function signature to include the new state_count parameter. This maintains the pattern used throughout the file.
Also applies to: 1015-1016
759-1010: Excellent test case for global cursor with no incremental dependency!This comprehensive test verifies that when using a global cursor with no incremental dependency, only a single state message is emitted (state_count=1), directly addressing the issue described in the PR. The test data setup is thorough and validates the specific fix implemented.
1037-1037: Correctly passes state_count to the test helper.The state_count parameter is properly passed to run_mocked_test, ensuring validation happens as expected for each test case.
1-1:Details
✅ Verification successful
Ruff formatting issue needs to be fixed.
The pipeline shows a Ruff formatting issue. Would you mind running 'ruff format' on this file before merge? This will ensure consistent code style across the codebase.
🏁 Script executed:
#!/bin/bash ruff format unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.pyLength of output: 113
Formatting issues addressed
I ran
ruff formaton this file and it was reformatted successfully. Everything looks consistent now—ready to merge? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
|
/autofix
|
Maxime Carbonneau-Leclerc (maxi297)
left a comment
There was a problem hiding this comment.
LGTM! ![]()
What
If the parent stream of a per-partition stream commits states, we may emit the state more than once at the end of the sync for a global cursor.
How
Added a condition in
ConcurrentPerPartitionCursorto avoid emitting the state when the global cursor is enabled, there's noparent_streamin the state, and throttling is applied.Summary by CodeRabbit
Bug Fixes
Tests
Style